package com.jie.flink.cdc.flinksink;

import com.jie.flink.cdc.flinksink.config.PulsarConfigProperties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;

/**
 * @author zhanggj
 * @date 2023/5/29 10:19
 * @desc
 */
public class PulsarFlinkSinkBuilder implements FlinkSinkBuilder<PulsarConfigProperties> {

    private PulsarFlinkSinkBuilder() {
    }

    private static class InstanceHolder{
        private static PulsarFlinkSinkBuilder INSTANCE = new PulsarFlinkSinkBuilder();
    }
    public static PulsarFlinkSinkBuilder getInstance() {
        return InstanceHolder.INSTANCE;
    }

    @Override
    public Sink<String> buildSink(final PulsarConfigProperties pulsarConfigProperties) {
        return PulsarSink.<String>builder()
                .setAdminUrl(pulsarConfigProperties.getAdminUrl())
                .setServiceUrl(pulsarConfigProperties.getServiceUrl())
                .setTopics(pulsarConfigProperties.getTopic())
                .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
                .build();
    }
}
